{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "9d46c4e3-8e24-4434-b9e1-b9d0cd6a97fb",
   "metadata": {},
   "source": [
    "## 主题配置\n",
    "\n",
    "主题作为消息的归类，可以再细分为一个或多个分区，分区也可以看作对消息的二次归类。分区的划分为 Kafka 提供了可伸缩性、水平扩展的功能，还通过多副本机制来为 Kafka 提供数据冗余以提高数据可靠性。\n",
    "\n",
    "### 使用 Kafka admin client 管理主题\n",
    "kafka-python提供了KafkaAdminClient类，用于管理Kafka集群中的主题、分区等信息。主要功能：\n",
    "\n",
    "* 创建主题：CreateTopicsResult createTopics(Collection newTopics)。\n",
    "* 删除主题：DeleteTopicsResult deleteTopics(Collection topics)。\n",
    "* 列出所有可用的主题：ListTopicsResult listTopics()。\n",
    "* 查看主题的信息：DescribeTopicsResult describeTopics(Collection topicNames)。\n",
    "* 查询配置信息：DescribeConfigsResult describeConfigs(Collection resources)。\n",
    "* 修改配置信息：AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs)。\n",
    "* 增加分区：CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions)\n",
    "\n",
    "以下是一个简单的示例代码片段，展示如何使用KafkaAdminClient创建主题"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "711efefc-212e-49b4-9541-d9bb9fccfd90",
   "metadata": {},
   "outputs": [],
   "source": [
    "from kafka.admin import KafkaAdminClient, NewTopic\n",
    "\n",
    "admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')\n",
    "\n",
    "# 创建一个名为'my_topic'的主题，其中包含3个分区和2个副本\n",
    "topic = NewTopic(name='my_topic', num_partitions=3, replication_factor=2)\n",
    "admin_client.create_topics([topic])"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3e1e53d7-02e6-4907-bc6d-46a3565e8eff",
   "metadata": {},
   "source": [
    "### 为主题选择合适的分区数\n",
    "选择合适的分区数是一个需要平衡吞吐量和资源利用率的问题。一般而言，分区数越多，能够支持的并发消费者就越多，吞吐量也就越高。但是，分区数过多也会导致资源利用率下降，增加管理和维护的复杂度。\n",
    "\n",
    "根据$kafka$官方文档的建议，可以考虑使用以下公式来估算合适的分区数：$P = max(0, ceil(Q / S))$，其中$P$是分区数，$Q$是期望的每秒消息处理量，$S$是每个分区每秒可处理的消息量。此外，也可以根据实际需求和负载情况来进行调整。\n",
    "\n",
    "参考文献：\n",
    "1. [知乎](https://www.zhihu.com/question/30182302)\n",
    "2. [掘金](https://juejin.cn/post/6844904102400916494)\n",
    "3. [知乎专栏](https://zhuanlan.zhihu.com/p/143622961)\n",
    "4. [CSDN博客](https://blog.csdn.net/small_snail/article/details/109067374)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "73cd31f6-6a13-4390-b061-77b34ee139c8",
   "metadata": {},
   "source": [
    "### 分区重分配\n",
    "\n",
    "kafka-reassign-partitions.sh 是 Kafka 提供的一个脚本，用于重新分配 Kafka 集群中某个分区的副本所在的节点。在 Kafka 集群中，每个分区都有多个副本，用于提高数据的可靠性和容错性。当某个节点宕机或网络异常时，可能会导致该节点上的分区副本不可用，此时需要将该分区副本重新分配到其他节点上，以确保数据可用性。\r\n",
    "\r\n",
    "kafka-reassign-partitions.sh 脚本可以通过执行一个 JSON 文件来重新分配分区副本，其中包含了要重新分配的分区和对应的副本所在的节点。该脚本需要指定 Kafka 集群的相关配置信息，例如 ZooKeeper 的地址、Kafka 的 broker 列表等。具体使用方法可以参考 Kafka 官方文档中的相关说明。\r\n",
    "\r\n",
    "需要注意的是，重新分配分区副本是一项比较复杂的操作，需要谨慎执行，以免影响 Kafka 集群的稳定性和可用性。同时，在执行该操作前需要进行充分的测试和验证，以确保操作的正确性和可行性。移。"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3689d6e7-efcd-4526-afe9-70e56f665acc",
   "metadata": {},
   "source": [
    "### 优先副本的选举\n",
    "\n",
    "kafka-leader-election.sh 是 Kafka 提供的一个脚本，用于手动触发 Kafka 集群中某个分区的 leader 选举过程。在 Kafka 集群中，每个分区都有一个 leader 节点，负责处理该分区的读写请求。当某个节点宕机或网络异常时，可能会导致该节点上的分区 leader 不可用，此时需要进行 leader 选举，选择一个新的节点作为该分区的 leader。\r\n",
    "\r\n",
    "kafka-leader-election.sh 脚本可以手动触发 leader 选举过程，以便及时恢复故障。该脚本需要指定要进行 leader 选举的分区和 Kafka 集群的相关配置信息，例如 ZooKeeper 的地址、Kafka 的 broker 列表等。具体使用方法可以参考 Kafka 官方文档中的相关说明。\r\n",
    "\r\n",
    "需要注意的是，手动触发 leader 选举可能会对集群造成一定的负担，因此在正式环境中应该谨慎使用，并尽可能采取自动化的方式进行故障恢复。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c7a55ab4-09ce-43ab-b466-66a294423e74",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
